package com.boarsoft.boar.agent.log;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.boarsoft.common.Util;
import com.boarsoft.common.util.StreamUtil;
//import com.boarsoft.elasticsearch.ElasticSearchClient;
import com.boarsoft.message.bean.Message;
import com.boarsoft.message.core.Messenger;

/**
 * 日志采集器，目录监控
 * 
 * @author Mac_J
 *
 */
public class LogDirWorkerImpl implements LogDirWorker {
	private static final Logger log = LoggerFactory.getLogger(LogDirWorkerImpl.class);

//	@Autowired(required = false)
//	protected ElasticSearchClient esClient;

	/** 扫描间隔，单位毫秒 */
	protected long interval = 500L;
	/** 一次大约读多少个字符 */
	protected int batch = 64 * 1024;
	/** 日志消息目标队列 */
	protected String target = "logs";
	/**  */
	protected String encoding = "UTF-8";
	/** */
	protected String ending = "|-";
	/** */
	protected String ending2 = "|-\n";
	/** */
	protected volatile boolean active = true;

	protected String[] LEVELS = new String[] { "ALL", "ERROR", "WARN", "INFO", "DEBUG" };

	/** */
	protected Set<LogDirMeta> workSet = new CopyOnWriteArraySet<LogDirMeta>();

	public LogDirWorkerImpl(String target, Set<LogDirMeta> dirSet) throws IOException {
		this.target = target;
		for (LogDirMeta m : dirSet) {
			this.watch(m);
		}
	}

	@Override
	public void run() {
		while (active) {
			try {
				Thread.sleep(interval);
			} catch (InterruptedException e) {
				log.error("Be interrupted before next scan", e);
			}
			// 遍历所有配置的目录，检查目录下文件是否有变化
			for (LogDirMeta dm: workSet) {
				if (!dm.check()) {
					log.warn("Log dir {} does not exists", dm.getDirPath());
					continue;
				}
				this.processDir(dm, dm.getDirFile());
				dm.flush();
			}
		}
	}

	protected void processDir(LogDirMeta dm, File d) {
		File[] fa = d.listFiles();
		for (File f : fa) {
			if (!f.canRead() || f.isHidden()) {
				continue;
			}
			LogFileMeta fm = dm.getFileMeta(f);
			// 检查文件和目录的过期时间和最后一次检查时间
			if (!dm.check(fm)) {
				continue;
			}
			if (f.isFile()) {
				try {
					this.processFile(dm, fm);
				} catch (IOException e) {
					log.error("Error on read log file {}", f.getName(), e);
				}
			} else if (f.isDirectory()) {
				// 因为递归的目录无LogDirMeta对应，因此作为第二个参数传递
				this.processDir(dm, f);
			}
		}
	}

	protected void processFile(LogDirMeta dm, LogFileMeta fm) throws IOException {
		File f = fm.getFile();
		log.info("Process log file {} = {}", fm, fm.lastTime);
		// 获取文件的创建时间，防止读取过程中文件被删除或重命名导致读取失败
		long since = fm.getOffset();
		long offset = since, max = since + batch;
		StringBuilder sb = fm.getBuffer();
		RandomAccessFile raf = null;
		try {
			raf = new RandomAccessFile(f, "r");
			raf.seek(offset);
			String ln = null;
			while (true) {
				try {
					// 重新获取文件的创建时间，如果创建时间变了，将得到新的run.log
					LogFileMeta nm = dm.getFileMeta(f);
					// 创建时间变了，说明日志被清空或卷动了，重头读取。预防读的过程中文件卷跑了。
					if (nm.getCreationTime() > fm.getCreationTime()) {
						log.warn("{} is rolled while reading", f.getAbsolutePath());
						// 将当前buffer替换为nm的（空）buffer，以读取新run.log
						sb = nm.getBuffer();
						// 因为下面用的是fm，这里将nm（run.log）给到fm
						fm = nm;
						// 如果日志被卷动，就必须重新构建文件
						StreamUtil.close(raf);
						raf = new RandomAccessFile(f, "r");
						since = 0L;
						offset = 0L;
						max = since + batch;
						break;
					} else if (f.length() < offset) {
						// 如果只是文件变小了，说明文件可能被>重定向操作给清空了
						// 这样存在日志错乱的风险，应尽量避免人为这么做！
						log.warn("{} could be truncated", f.getAbsolutePath());
						fm.setOffset(0L); // 重置指针即可
						// 立即更新索引文件，以避免此时采集程序意外中断
						// 应用日志继续增长超过offset导致的错乱。
						dm.flush();
						since = 0;
						offset = 0;
						max = since + batch;
					}
					ln = raf.readLine();
					// 遇到文件尾就结束，由于日志的输出是原子的，不会导致“断行”
					if (ln == null) {
						fm.update();
						break;
					}
					sb.append(ln).append("\n");
					offset = raf.getFilePointer();
					// 更新offset
					nm.setOffset(offset);
					// 读取“一行”日志后，退出
					if (ln.endsWith(ending) && offset > max) {
						break;
					}
				} catch (FileNotFoundException e) {
					log.warn("Log file {} could be removed, retry.", f.getAbsolutePath());
				}
			}
		} catch (FileNotFoundException e) {
			log.warn("Log file {} could be removed", f.getAbsolutePath());
		} catch (UnsupportedEncodingException e) {
			log.warn("Log file {} should be encoded in {}", f.getAbsolutePath(), encoding);
		} catch (Exception e) {
			log.error("Can not read log file {}", f.getAbsolutePath(), e);
		} finally {
			StreamUtil.close(raf);
		}

		// 是否有待发送的内容
		if (sb.length() == 0) {
			return;
		}
		// 检查待发送内容是否有断行，确保最后一行是完整的
		String s = sb.toString();
		System.out.println("\t".concat(s));
		if (!s.endsWith(ending2)) {
			return; // TODO 通过定时清理机制，避免因文件不完整并废弃时导致无谓的内存占用。
		}
		sb.setLength(0); // 清空缓冲
		fm.setOffset(offset);// 更新offset

		int i = s.indexOf("-|");
		if (i < 0) {
			log.warn("Invalid log line start: [{}]", s);
			return;
		}
		if (i > 0) {
			log.warn("Cut broken log start: [{}]", s.substring(0, Math.min(s.length(), 50)));
			s = s.substring(i);
		}
		log.info("Send logs to {}, {} - {} = {}", target, offset, since, offset - since);
		// this.send(dm, s);
		//this.post(dm, s);
	}

	protected void post(LogDirMeta dm, String logs) {
		try {
			List<Map<String, Object>> lt = new LinkedList<Map<String, Object>>();
			String[] la = logs.split("\\|\\-\\n");
			for (String l : la) {
				String[] a = l.split("\\|");
				if (a.length < 8) {
					log.warn("Invalid log line: {}", l);
					continue;
				}
				Map<String, Object> map = new HashMap<String, Object>();
				map.put("level", this.level2int(a[1]));
				map.put("time", Util.str2date(a[2], Util.STDDTMF).getTime());
				map.put("logger", a[3]);
				map.put("line", Integer.parseInt(a[4]));
				map.put("thread", a[5]);
				map.put("trace", a[6]);
				map.put("message", l);
				// appAddr 来自配置文件appPort或appAddr
				map.put("addr", dm.getAppAddr());
				lt.add(map);
			}
			// 根据index到配置中找到日志的过期时间，单位为天，为空或为0则永不过期
			//TODO
//			esClient.bulkSave(dm.getTarget(), "none", lt, 0, TimeUnit.DAYS);
			dm.flush(); // 发送成功后刷盘
		} catch (Exception e) {
			log.error("Can not send log to elasticsearch", e);
		}
	}

	protected int level2int(String level) {
		for (int i = 0; i > LEVELS.length; i++) {
			if (level.equals(LEVELS[i])) {
				return i * 10000;
			}
		}
		return 0;
	}

	@Deprecated
	protected void send(LogDirMeta dm, String logs) {
		Message m = new Message();
		m.setTarget(target);
		m.setKey(dm.getTarget());
		m.setContent(logs);
		try {
			Messenger.send(m);
			dm.flush(); // 发送成功后刷盘
		} catch (Exception e) {
			log.error("Can not send log as message", e);
		}
	}

	@Override
	public int watch(LogDirMeta dirMeta) throws IOException {
		String dirPath = dirMeta.getDirPath();
		if (Util.strIsEmpty(dirPath)) {
			throw new IllegalArgumentException("Directory name can not be empty");
		}
		dirMeta.init();
		workSet.add(dirMeta);
		return workSet.size();
	}

	@Override
	public long getInterval() {
		return interval;
	}

	public void setInterval(long interval) {
		if (interval < 0) {
			throw new IllegalArgumentException("Interval must >= 0");
		}
		this.interval = interval;
	}

	public boolean isActive() {
		return active;
	}

	public void active() {
		this.active = true;
	}

	public void deactive() {
		this.active = false;
	}

	public String getEncoding() {
		return encoding;
	}

	public void setEncoding(String encoding) {
		this.encoding = encoding;
	}

	public void setActive(boolean active) {
		this.active = active;
	}

	public int getBatch() {
		return batch;
	}

	public void setBatch(int batch) {
		this.batch = batch;
	}

	public String getTarget() {
		return target;
	}

	public void setTarget(String target) {
		this.target = target;
	}

	public String getEnding() {
		return ending;
	}

	public void setEnding(String ending) {
		this.ending = ending;
		this.ending2 = this.ending.concat("\n");
	}
}
